perf: skip concurrency overhead in BaseSingleBlockCombineOperator when numTasks=1#18146
perf: skip concurrency overhead in BaseSingleBlockCombineOperator when numTasks=1#18146deeppatel710 wants to merge 4 commits intoapache:masterfrom
Conversation
…n numTasks=1 When a query runs with a single execution task (e.g. one segment or maxExecutionThreads=1), BaseSingleBlockCombineOperator still incurred the full multi-thread overhead: ExecutorService.submit(), Phaser registration/ deregistration, BlockingQueue.poll() with timeout, AtomicInteger, and AtomicReference. This adds a single-thread fast path in getNextBlock(): when _numTasks==1 and _resultsBlockMerger is non-null (i.e. the subclass uses the default merge strategy), segments are processed sequentially on the calling thread with none of that synchronization overhead. CPU time and memory are still tracked via ThreadResourceSnapshot. Subclasses that override mergeResults() with custom logic (e.g. SequentialSortedGroupByCombineOperator, which passes null for _resultsBlockMerger) are unaffected and continue using the standard path. Fixes apache#14617 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
xiangfu0
left a comment
There was a problem hiding this comment.
I found a critical correctness issue with the fast path implementation:
Timeout Handling Missing: The new getNextBlockSingleThread() method lacks timeout protection that exists in the original mergeResults() method. The original method checks _queryContext.getEndTimeMs() and returns a timeout results block if the deadline is exceeded. The fast path has no such protection, which means:
- A hanging segment operator could block indefinitely
- No respect for query timeout deadline
- Potential resource exhaustion if a segment operator stalls
The original mergeResults() path handles this via the _blockingQueue.poll(waitTimeMs, TimeUnit.MILLISECONDS) with explicit timeout checking. The fast path should implement similar timeout protection.
| } | ||
| } | ||
|
|
||
| /// Processes all segments sequentially on the calling thread when only one task is needed. |
There was a problem hiding this comment.
Missing timeout protection. This fast path should respect _queryContext.getEndTimeMs() like the original mergeResults() method does. A hanging segment operator could block indefinitely here without timeout checking.
There was a problem hiding this comment.
Good catch. Added a System.currentTimeMillis() >= endTimeMs check at the top of each loop iteration before invoking operator.nextBlock(). If the deadline is exceeded, getTimeoutResultsBlock(i) is returned
immediately — exactly mirroring the waitTimeMs <= 0 guard in mergeResults(). A stalled segment operator will now be bypassed at the next iteration boundary rather than blocking indefinitely.
…ombineOperator The getNextBlockSingleThread() fast path was missing the timeout protection present in the original mergeResults() method. A stalled segment operator could block indefinitely with no deadline enforcement. Fix: check System.currentTimeMillis() >= endTimeMs before invoking each segment operator. If the deadline is exceeded, return a timeout results block immediately. This mirrors the waitTimeMs <= 0 guard in mergeResults(). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18146 +/- ##
============================================
+ Coverage 63.04% 63.39% +0.34%
- Complexity 1617 1627 +10
============================================
Files 3202 3229 +27
Lines 194718 196730 +2012
Branches 30047 30415 +368
============================================
+ Hits 122760 124714 +1954
+ Misses 62233 62016 -217
- Partials 9725 10000 +275
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…hread fast path Two bugs in getNextBlockSingleThread(): 1. Resource stats double-counting: The method accumulated _totalWorkerThreadCpuTimeNs on the calling thread, but InstanceResponseOperator.getBaseBlock() already captures the same thread's CPU time as mainThreadCpuTimeNs. The calSystemActivitiesCpuTimeNs formula (wallClock - mainThread - workerThread/N) then subtracted the same work twice, producing a negative value clamped to 0. This broke testResourceUsageStats in the CPU/memory query killing integration tests. Fix: remove the ThreadResourceSnapshot tracking; the main thread's snapshot already accounts for all work done in the single-thread path. 2. Exception handling for operator failures: wrapOperatorException() throws (not returns) a new QueryException for most RuntimeExceptions. Calling it as an argument to createExceptionResultsBlock- AndAttachExecutionStats caused the thrown exception to escape the catch block uncaught, propagating to callers instead of being wrapped in an error block. This broke JsonExtractScalarTransformFunctionTest.mvWithNullsWithoutDefault. Fix: wrap in try/throw/catch to capture whatever wrapOperatorException throws or returns, then convert to an error block — mirroring the multi- thread path's onProcessSegmentsException handler. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Summary
Fixes #14617
When a query runs with a single execution task (one segment, or
maxExecutionThreads=1),BaseSingleBlockCombineOperatorstill incurred the full multi-thread overhead:ExecutorService.submit()— thread pool task submissionPhaser— register/deregister synchronizationBlockingQueue.poll()— with timeout waitingAtomicInteger/AtomicReference— volatile reads/writes on the hot pathNone of this is necessary when
_numTasks == 1.Change
Added a
getNextBlockSingleThread()fast path inBaseSingleBlockCombineOperator.getNextBlock(): when_numTasks == 1and_resultsBlockMergeris non-null, all segments are processed sequentially on thecalling thread with no synchronization overhead. CPU time and memory allocation are still tracked via
ThreadResourceSnapshot.Subclasses that override
mergeResults()with custom logic (e.g.SequentialSortedGroupByCombineOperator, which passesnullfor_resultsBlockMerger) are unaffected and fall through to the standardmulti-thread path.
Test plan
CombineSlowOperatorsTest,CombineErrorOperatorsTest,SelectionCombineOperatorTest,SortedGroupByCombineOperatorsTest,CombinePlanNodeTest— all green